18.4 线程

当newproc1成功创建G任务后,会尝试用wakep唤醒M执行任务。

proc1.go

func wakep() { // 被唤醒的线程需要绑定 P,累加自旋计数,避免 newproc1 唤醒过多线程 if !cas(&sched.nmspinning, 0, 1) { return } startm(nil, true) }

func startm(p *p, spinning bool) { // 如果没有指定 P,尝试获取空闲 P if p == nil { p = pidleget()

 // 获取失败,终止 
 if _p_ == nil { 
     // 递减自旋计数 
     if spinning { 
         xadd(&sched.nmspinning, -1) 
     } 
     return 
 } 

}

// 获取休眠的闲置 M mp := mget()

// 如没有闲置 M,新建 if mp == nil { // 默认启动函数 // 主要是判断 M.nextp 是否有暂存的 P,以此调整自旋计数 var fn func() if spinning { fn = mspinning } newm(fn, p) return }

// 设置自旋状态和暂存 P mp.spinning = spinning mp.nextp.set(p)

// 唤醒 M notewakeup(&mp.park) }

notewakeup/notesleep实现细节参见后文。

和前文G对象复用类似,这个过程同样有闲置获取和新建两种方式。先不去理会闲置列表,看看M究竟如何创建,如何包装系统线程。

runtime2.go

type m struct { g0 *g // 提供系统栈空间 mstartfn func() // 启动函数 curg *g // 当前运行 G p puintptr // 绑定 P nextp puintptr // 临时存放 P spinning bool // 自旋状态 park note // 休眠锁 schedlink muintptr // 链表 }

proc1.go

func newm(fn func(), p *p) { // 创建 M 对象 mp := allocm(p, fn)

// 暂存 P mp.nextp.set(p)

// 创建系统线程 newosproc(mp, unsafe.Pointer(mp.g0.stack.hi)) }

func allocm(p *p, fn func()) *m { mp := new(m) mp.mstartfn = fn // 启动函数 mcommoninit(mp) // 初始化

// 创建 g0 // In case of cgo or Solaris, pthread_create will make us a stack. // Windows and Plan 9 will layout sched stack on OS stack. if iscgo || GOOS "solaris" || GOOS “windows” || GOOS == “plan9” { mp.g0 = malg(-1) } else { mp.g0 = malg(8192 * stackGuardMultiplier) } mp.g0.m = mp

return mp }

M最特别的就是自带一个名为g0,默认8 KB栈内存的G对象属性。它的栈内存地址被传给newosproc函数,作为系统线程的默认堆栈空间(并非所有系统都支持)。

os1_linux.go

const cloneFlags = _CLONE_VM | /* share memory / _CLONE_FS | / share cwd, etc / _CLONE_FILES | / share fd table / _CLONE_SIGHAND | / share sig handler table / _CLONE_THREAD / revisit - okay for now */

func newosproc(mp *m, stk unsafe.Pointer) { ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart))) }

关于系统调用clone的更多信息,请参考man 2手册。

os1_windows.go

func newosproc(mp *m, stk unsafe.Pointer) { const _STACK_SIZE_PARAM_IS_A_RESERVATION = 0x00010000 thandle := stdcall6(_CreateThread, 0, 0x20000, funcPC(tstart_stdcall), uintptr(unsafe.Pointer(mp)), _STACK_SIZE_PARAM_IS_A_RESERVATION, 0) if thandle == 0 { print(“runtime: failed to create new OS thread (have ”, mcount(), ” already; errno=”, getlasterror(), ”)\n”) throw(“runtime.newosproc”) } }

Windows API CreateThread不支持自定义线程堆栈。

在进程执行过程中,有两类代码需要运行。一类自然是用户逻辑,直接使用G栈内存;另一类是运行时管理指令,它并不便于直接在用户栈上执行,因为这需要处理与用户逻辑现场有关的一大堆事务。

举例来说,G任务可在中途暂停,放回队列后由其他M获取执行。如果不更改执行栈,可能会造成多个线程共享内存,从而引发混乱。另外,在执行垃圾回收操作时,如何收缩依旧被线程持有的G栈空间?因此,当需要执行管理指令时,会将线程栈临时切换到g0,与用户逻辑彻底隔离。

其实,在前文就经常看到systemstack这种执行方式,它就是切换到g0栈后再执行运行时相关管理操作的。

proc1.go

func newproc(siz int32, fn *funcval) { systemstack(func() { newproc1(fn, (*uint8)(argp), siz, 0, pc) }) }

asm_amd64.s

TEXT runtime·systemstack(SB), NOSPLIT, $0-8 MOVQ fn+0(FP), DI // DI = fn MOVQ g(CX), AX // AX = g MOVQ g_m(AX), BX // BX = m

MOVQ m_g0(BX), DX // DX = g0 CMPQ AX, DX // 如果当前g已经是 g0,那么无须切换 JEQ noswitch

MOVQ m_curg(BX), R8 // 当前 g CMPQ AX, R8 // 如果是用户逻辑 g,切换 JEQ switch // Bad: g is not gsignal, not g0, not curg. What is it? MOVQ $runtime·badsystemstack(SB), AX CALL AX

switch: // 将 G 状态保存到 sched MOVQ $runtime·systemstack_switch(SB), SI MOVQ SI, (g_sched+gobuf_pc)(AX) MOVQ SP, (g_sched+gobuf_sp)(AX) MOVQ AX, (g_sched+gobuf_g)(AX) MOVQ BP, (g_sched+gobuf_bp)(AX)

// 切换到 g0.stack MOVQ DX, g(CX) // DX = g0 MOVQ (g_sched+gobuf_sp)(DX), BX // 从 g0.sched 获取 SP SUBQ runtime·mstart(SB), DX MOVQ DX, 0(BX) MOVQ BX, SP // 通过调整 SP 寄存器值来切换栈内存

// 执行系统管理函数 MOVQ DI, DX // DI = fn MOVQ 0(DI), DI CALL DI

// 切换回 G,恢复执行现场 MOVQ g(CX), AX MOVQ g_m(AX), BX MOVQ m_curg(BX), AX MOVQ AX, g(CX) MOVQ (g_sched+gobuf_sp)(AX), SP MOVQ $0, (g_sched+gobuf_sp)(AX) RET

noswitch: // already on m stack, just call directly MOVQ DI, DX MOVQ 0(DI), DI CALL DI RET

从这段代码,我们可以看出g0为什么同样是G对象,而不直接用stack的原因。

M初始化操作会检查已有的M数量,如超出最大限制(默认为10,000)会导致进程崩溃。所有M被添加到allm链表,且不被释放。

runtime2.go

var allm *m

proc1.go

func mcommoninit(mp *m) { mp.id = sched.mcount sched.mcount++ checkmcount() mpreinit(mp)

mp.alllink = allm atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp)) }

func checkmcount() { if sched.mcount > sched.maxmcount { throw(“thread exhaustion”) } }

可用runtime/debug.SetMaxThreads修改最大线程数量限制,但仅建议在测试阶段通过设置较小值作为错误触发条件。

回到wakep/startm流程,默认优先选用闲置M,只是这个闲置的M从何而来?

runtime2.go

type schedt struct { midle muintptr // 闲置 M 链表 nmidle int32 // 闲置的 M 数量 mcount int32 // 已创建的 M 总数 maxmcount int32 // M 最大闲置数 }

proc1.go

// 从空闲链表获取 M func mget() *m { mp := sched.midle.ptr() if mp != nil { sched.midle = mp.schedlink sched.nmidle— } return mp }

被唤醒而进入工作状态的M,会陷入调度循环,从各种可能场所获取并执行G任务。只有当彻底找不到可执行任务,或因任务用时过长、系统调用阻塞等原因被剥夺P时,M才会进入休眠状态。

proc1.go

// 停止 M,使其休眠 func stopm() { g := getg()

// 取消自旋状态 if g.m.spinning { g.m.spinning = false xadd(&sched.nmspinning, -1) }

retry: // 放回闲置队列 mput(g.m) // 休眠,等待被唤醒 notesleep(&g.m.park) noteclear(&g.m.park)

// 绑定 P acquirep(g.m.nextp.ptr()) g.m.nextp = 0 }

// 将 M 放入闲置链表 func mput(mp *m) { mp.schedlink = sched.midle sched.midle.set(mp) sched.nmidle++ }

我们允许进程里有成千上万的并发任务G,但最好不要有太多的M。且不说通过系统调用创建线程本身就有很大的性能损耗,大量闲置且不被回收的线程、M对象、g0栈空间都是资源浪费。好在这种情形极少出现,不过还是建议在生产部署前进行严格的测试。

下面是利用cgo调用sleep syscall来生成大量M的示例。

test.go

package main

import ( “sync” “time” )

// include <unistd.h> import “C”

func main() { var wg sync.WaitGroup wg.Add(1000)

for i := 0; i < 1000; i++ { 
    go func() { 
        C.sleep(1) 
        wg.Done() 
    }() 
} 

wg.Wait() 
println("done!") 
time.Sleep(time.Second * 5) 

}

利用GODEBUG输出调度器状态,你会看到大量闲置线程。

$ go build -o test test

$ GODEBUG=“schedtrace=1000” ./test

SCHED 0ms: gomaxprocs=2 idleprocs=1 threads=3 spinningthreads=0 idlethreads=0 runqueue=0 [0 0] SCHED 1006ms: gomaxprocs=2 idleprocs=0 threads=728 spinningthreads=0 idlethreads=0 runqueue=125 [113 33] SCHED 2009ms: gomaxprocs=2 idleprocs=2 threads=858 spinningthreads=0 idlethreads=590 runqueue=0 [0 0] done! SCHED 3019ms: gomaxprocs=2 idleprocs=2 threads=858 spinningthreads=0 idlethreads=855 runqueue=0 [0 0] SCHED 4029ms: gomaxprocs=2 idleprocs=2 threads=858 spinningthreads=0 idlethreads=855 runqueue=0 [0 0] SCHED 5038ms: gomaxprocs=2 idleprocs=2 threads=858 spinningthreads=0 idlethreads=855 runqueue=0 [0 0] SCHED 6048ms: gomaxprocs=2 idleprocs=2 threads=858 spinningthreads=0 idlethreads=855 runqueue=0 [0 0]

runqueue输出全局队列,以及P本地队列的G任务数量。

可将done后的等待时间修改得更长(比如10分钟),用来观察垃圾回收和系统监控等机制是否会影响idlethreads数量。

$ GODEBUG=“gctrace=1,schedtrace=1000” ./test

除线程数量外,程序执行时间(user、sys)也有很大差别,可以简单对比一下。

func main() { var wg sync.WaitGroup wg.Add(1000)

for i := 0; i < 1000; i++ { 
    go func() { 
        C.sleep(1)          // 测试 1 
        // time.Sleep(time.Second)     // 测试 2 

        wg.Done() 
    }() 
} 

wg.Wait() 

}

$ go build -o test1 test.go && time ./test1

real 0m1.159s user 0m0.056s sys 0m0.105s

$ go build -o test2 test.go && time ./test2

real 0m1.022s user 0m0.006s sys 0m0.006s

输出结果中user和sys分别表示用户态和内核态执行时间,多核累加。

标准库封装的time.Sleep针对goroutine进行了改进,并未使用syscall。当然,这个示例和测试结果也仅用于演示,具体问题还须具体分析。